{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ETL add new etl process\n",
    "> Add your custom ETL process to the ETL pipeline."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Original ETL Pipeline \n",
    "> This is simple ETL pipeline to load huggingface dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: ETL\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___huggingface___hf2raw\n",
      "  args:\n",
      "    name_or_path:\n",
      "    - ai2_arc\n",
      "    - ARC-Challenge\n",
      "- name: data_load___huggingface___ufl2hf_obj\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "# load from dict\n",
    "ETL_config = OmegaConf.create({\n",
    "    'spark': {\n",
    "        'appname': 'ETL',\n",
    "        'driver': {'memory': '16g'},\n",
    "    },\n",
    "    'etl': [\n",
    "        {\n",
    "            'name': 'data_ingestion___huggingface___hf2raw',\n",
    "            'args': {'name_or_path': ['ai2_arc', 'ARC-Challenge']}\n",
    "        },\n",
    "        {\n",
    "            'name': 'data_load___huggingface___ufl2hf_obj'\n",
    "        }\n",
    "    ]\n",
    "})\n",
    "\n",
    "print(OmegaConf.to_yaml(ETL_config))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/11/14 19:26:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "23/11/14 19:26:56 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "23/11/14 19:26:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n",
      "Found cached dataset ai2_arc (/root/.cache/huggingface/datasets/ai2_arc/ARC-Challenge/1.0.0/1569c2591ea2683779581d9fb467203d9aa95543bb9b75dcfde5da92529fd7f6)\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "0aeed70bb9b34721aa5f6e8abf72a85f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "  0%|          | 0/3 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Downloading and preparing dataset spark/14056872 to /root/.cache/huggingface/datasets/spark/14056872/0.0.0...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dataset spark downloaded and prepared to /root/.cache/huggingface/datasets/spark/14056872/0.0.0. Subsequent calls will reuse this data.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Dataset({\n",
       "    features: ['answerKey', 'choices', 'id', 'question'],\n",
       "    num_rows: 2590\n",
       "})"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "\n",
    "# raw -> hf_obj\n",
    "spark, dataset = etl_pipeline.run(ETL_config)\n",
    "dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'answerKey': 'A',\n",
       " 'choices': {'text': ['loss of electrons.',\n",
       "   'loss of protons.',\n",
       "   'gain of electrons.',\n",
       "   'gain of protons.'],\n",
       "  'label': ['A', 'B', 'C', 'D']},\n",
       " 'id': 'Mercury_7029645',\n",
       " 'question': 'Metal atoms will most likely form ions by the'}"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dataset[0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Add Custom ETL Process\n",
    "\n",
    "1. create your custom ETL process\n",
    "2. check ETL process is registered\n",
    "3. wrap it with `register_etl` decorator\n",
    "4. add your custom ETL process to the ETL config\n",
    "5. run the ETL pipeline\n",
    "\n",
    "Here you are going to make a simple "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dataverse.etl import register_etl"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 1. create your custom ETL process\n",
    "\n",
    "- naming convention is `cate___sub-cate___name`\n",
    "    - e.g. `huggingface___dataset___load_dataset`\n",
    "- for input because we are using huggingface dataset `List[Dict]` format will be inserted\n",
    "\n",
    "```python\n",
    "# ai2_arc format\n",
    "[\n",
    "    {\n",
    "        'id': ...,\n",
    "        'choices': ...,\n",
    "        'question': ...,\n",
    "        'answerKey': ...,\n",
    "    },\n",
    "    {...},\n",
    "    ...\n",
    "]\n",
    "```\n",
    "\n",
    "Make a spark process assuming `List[Dict]` is given. Here we are simply going to remove `choices` key from each data point"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def your___custom___etl_process(spark, data, *args, **kwargs):\n",
    "    # add your custom process here\n",
    "    # here we are going to simply remove 'choices' key\n",
    "    data = data.map(lambda x: {k: v for k, v in x.items() if k != 'choices'})\n",
    "\n",
    "    return data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 2. check ETL process is registered\n",
    "\n",
    "ETL Pipeline only runs registered ETL process"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "==================================================\n",
       "Total [ 54 ]\n",
       "==================================================\n",
       "data_ingestion [ 18 ]\n",
       "deduplication [ 6 ]\n",
       "cleaning [ 20 ]\n",
       "pii [ 2 ]\n",
       "quality [ 2 ]\n",
       "data_load [ 4 ]\n",
       "utils [ 2 ]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLRegistry \n",
    "\n",
    "# we can see our custom is not registered yet\n",
    "ETLRegistry()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 3. wrap it with `register_etl` decorator\n",
    "\n",
    "How to register your custom ETL process?\n",
    "Simply wrap it with `register_etl` decorator\n",
    "\n",
    "```python\n",
    "@register_etl\n",
    "def your_custom_etl_process():"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "@register_etl\n",
    "def your___custom___etl_process(spark, data, *args, **kwargs):\n",
    "    # remove all text\n",
    "    data = data.map(lambda x: {k: v for k, v in x.items() if k != 'choices'})\n",
    "\n",
    "    return data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "==================================================\n",
       "Total [ 55 ]\n",
       "==================================================\n",
       "data_ingestion [ 18 ]\n",
       "deduplication [ 6 ]\n",
       "cleaning [ 20 ]\n",
       "pii [ 2 ]\n",
       "quality [ 2 ]\n",
       "data_load [ 4 ]\n",
       "utils [ 2 ]\n",
       "your [ 1 ]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# you will see your custom etl is registered\n",
    "ETLRegistry()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 4. add your custom ETL process to the ETL config\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: ETL\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___huggingface___hf2raw\n",
      "  args:\n",
      "    name_or_path:\n",
      "    - ai2_arc\n",
      "    - ARC-Challenge\n",
      "- name: your___custom___etl_process\n",
      "- name: data_load___huggingface___ufl2hf_obj\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "# load from dict\n",
    "ETL_config = OmegaConf.create({\n",
    "    'spark': {\n",
    "        'appname': 'ETL',\n",
    "        'driver': {'memory': '16g'},\n",
    "    },\n",
    "    'etl': [\n",
    "        {\n",
    "            'name': 'data_ingestion___huggingface___hf2raw',\n",
    "            'args': {'name_or_path': ['ai2_arc', 'ARC-Challenge']}\n",
    "        },\n",
    "\n",
    "        # ======== add your custom etl here ========\n",
    "        {\n",
    "            'name': 'your___custom___etl_process'\n",
    "        },\n",
    "        # ==========================================\n",
    "\n",
    "        {\n",
    "            'name': 'data_load___huggingface___ufl2hf_obj'\n",
    "        }\n",
    "    ]\n",
    "})\n",
    "\n",
    "print(OmegaConf.to_yaml(ETL_config))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 5. run the ETL pipeline\n",
    "\n",
    "You can check that ETL process you added customly works great and `choices` are removed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/11/14 19:27:13 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).\n",
      "23/11/14 19:27:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Found cached dataset ai2_arc (/root/.cache/huggingface/datasets/ai2_arc/ARC-Challenge/1.0.0/1569c2591ea2683779581d9fb467203d9aa95543bb9b75dcfde5da92529fd7f6)\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "7ef3d804674d408ba6696c00e6e58bd1",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "  0%|          | 0/3 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Downloading and preparing dataset spark/1082445423 to /root/.cache/huggingface/datasets/spark/1082445423/0.0.0...\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dataset spark downloaded and prepared to /root/.cache/huggingface/datasets/spark/1082445423/0.0.0. Subsequent calls will reuse this data.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Dataset({\n",
       "    features: ['answerKey', 'id', 'question'],\n",
       "    num_rows: 2590\n",
       "})"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "\n",
    "# raw -> hf_obj\n",
    "spark, dataset = etl_pipeline.run(ETL_config)\n",
    "dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'answerKey': 'A',\n",
       " 'id': 'Mercury_7029645',\n",
       " 'question': 'Metal atoms will most likely form ions by the'}"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dataset[0]"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.11"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
